package com.tt.notify.producer;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.tt.notify.common.config.MQConfiguration;
import com.tt.notify.common.constant.MQ;
import com.tt.notify.common.constant.MQ.Topic;
import com.tt.notify.common.exception.MQException;


/**
 * 消息生产者
 * @author  liuhaihui
 * @date    2017年7月13日 下午6:40:49
 * @version
 */
@Component
public class MQProducer {
	
	private final Logger logger = LoggerFactory.getLogger(MQProducer.class);

	/**
	 * 生产者的配置注入P2PConfiguration,BoardCastConfiguration都一样，随便注入一个即可
	 */
	@Autowired
	private MQConfiguration config;

	/**
	 * [2]同一个GroupName消费只对应的Topic的情况
	 */
	private ConcurrentHashMap<MQ.Topic, DefaultMQProducer> defaultMQProducerMap
			= new ConcurrentHashMap<MQ.Topic, DefaultMQProducer>();
	
	@PostConstruct
	public void init() throws MQClientException {
		for (Topic topic : MQ.Topic.values()) {
			DefaultMQProducer topicRefenceMQProducer = new DefaultMQProducer(topic.getProducerAndConsumerGroupName());
			topicRefenceMQProducer.setNamesrvAddr(config.getNamesrvAddr());
			//defaultMQProducer.setInstanceName(config.getInstanceName());
			// 关闭VIP通道，避免出现connect to <:10909> failed导致消息发送失败
			//defaultMQProducer.setVipChannelEnabled(false);
			topicRefenceMQProducer.start();
			//config.getProducerGroup()
			logger.info("消费者:consumerGroup={},namesrvAddr={}", topic.getProducerAndConsumerGroupName(), config.getNamesrvAddr());
			defaultMQProducerMap.put(topic, topicRefenceMQProducer);
		}
		logger.info("生产者启动成功|RocketMQ Producer start success");
	}
	
	/**
	 * 应用退出时，要调用shutdown来清理资源，关闭网络连接，从MetaQ服务器上注销自己 
     * 注意：我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
     * Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {  
	        public void run() {  
	            producer.shutdown();  
	        }  
	    }));
	 */
	@PreDestroy
	public void destroy() {
		for(Map.Entry<MQ.Topic, DefaultMQProducer> entrySet : defaultMQProducerMap.entrySet()){
			entrySet.getValue().shutdown();
		}
	}

	/**
	 * 发送消息
	 * @param msg
	 * @return
	 * @throws MQClientException
	 * @throws RemotingException
	 * @throws MQBrokerException
	 * @throws InterruptedException
	 */
	public SendResult send(final MQ.Topic topic, final MQ.Tag tag, final String message) {
		try {
			Message msg = new Message(topic.getName(), tag.getName(), "RMQKEY20170816193746", message.getBytes());
			return defaultMQProducerMap.get(topic).send(msg);
		} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
			throw new MQException(e);
		}
	}
}
